// mmo_game_server/data/kafka_consumer.go
package data

import (
    "fmt"
    "log"

    "github.com/segmentio/kafka-go"
)

// KafkaConsumer 初始化Kafka消费者
func KafkaConsumer() *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "player_data",
        GroupID: "player_data_group",
    })
}

// ConsumeMessage 从Kafka消费消息
func ConsumeMessage() {
    r := KafkaConsumer()
    defer r.Close()
    for {
        msg, err := r.ReadMessage(ctx)
        if err != nil {
            log.Printf("Failed to consume message from Kafka: %v", err)
            break
        }
        fmt.Printf("Received message: %s from topic: %s\n", string(msg.Value), msg.Topic)
    }
}